package org.jiucheng.magpiebridge.server.aio;

import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.CompletionHandler;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.jiucheng.magpiebridge.protocol.Message;
import org.jiucheng.magpiebridge.server.aio.proxy.ProxyReadCompletionHandler;
import org.jiucheng.magpiebridge.server.aio.proxy.ProxyAttachment;
import org.jiucheng.magpiebridge.server.aio.proxy.ProxyWriteCompletionHandler;
import org.jiucheng.magpiebridge.server.util.Container;
import org.jiucheng.magpiebridge.server.util.ServerCfg;
import org.jiucheng.magpiebridge.server.aio.proxy.ProxyEstablishmentAttachment;
import org.jiucheng.magpiebridge.server.aio.proxy.ProxyEstablishmentCompletionHandler;
import org.jiucheng.magpiebridge.util.ThreadManager;

/**
 * 
 * @author jiucheng
 *
 */
public class ServerReadCompletionHandler implements CompletionHandler<Integer, ServerAttachment> {
    private static final Logger LOGGER = Logger.getLogger(ServerReadCompletionHandler.class.getName());
    
    public void completed(Integer result, ServerAttachment attachment) {
        // client关闭连接
        if (result == -1) {
            close(attachment);
            return;
        }
        
        ByteBuffer readByteBuffer = attachment.getReadBuffer(); 
        if (readByteBuffer.position() != readByteBuffer.capacity()) {
            attachment.getServer().read(readByteBuffer, attachment, this);
            return;
        }
        
        if (readByteBuffer.capacity() == 13) {
            // 验证消息头
            readByteBuffer.flip();
            int magic = readByteBuffer.getInt();
            byte type = readByteBuffer.get();
            int uri = readByteBuffer.getInt();
            int size = readByteBuffer.getInt();
            if (magic != Message.MAGIC) {
                close(attachment);
                return;
            }
            if (size > 0) {
                readByteBuffer = ByteBuffer.allocate(size + 13);
                readByteBuffer.putInt(magic);
                readByteBuffer.put(type);
                readByteBuffer.putInt(uri);
                readByteBuffer.putInt(size);
                attachment.getServer().read(readByteBuffer, attachment.setReadBuffer(readByteBuffer), this);
                return;
            }
        }
        
        Message message = Message.fromByteBuffer(readByteBuffer);
        byte type = message.getType();
        if (type == Message.Type.AUTH) {
            // 认证
            String clientKey = new String(message.getData());
            // 认证
            if (clientKey == null || clientKey.trim().length() < 1) {
                close(attachment);
                return;
            }
            // 初始化代理服务
            if (!initProxy(clientKey, attachment)) {
                close(attachment);
                return;
            }
            readByteBuffer = ByteBuffer.allocate(13);
            attachment.getServer().read(readByteBuffer, attachment.setReadBuffer(readByteBuffer), this);
        } else if (type == Message.Type.HEARTBEAT) {
            readByteBuffer.clear();
            attachment.getServer().read(readByteBuffer, attachment.setReadBuffer(readByteBuffer), this);
        } else if (type == Message.Type.TRANSFER) {
            ProxyAttachment proxyAttachment = ServerAttachment.proxys.get(message.getUri());
            if (proxyAttachment != null && proxyAttachment.canWrited()) {
                ByteBuffer writeByteBuffer = ByteBuffer.allocate(message.getSize());
                writeByteBuffer.put(message.getData());
                writeByteBuffer.flip();
                
                proxyAttachment.getAsynchronousSocketChannel().write(writeByteBuffer, proxyAttachment.setWriteBuffer(writeByteBuffer), new ProxyWriteCompletionHandler());
                
                readByteBuffer = ByteBuffer.allocate(13);
                attachment.getServer().read(readByteBuffer, attachment.setReadBuffer(readByteBuffer), this);
            }
        } else if (type == Message.Type.CONNECT) {
            if (LOGGER.isLoggable(Level.FINE)) {
                LOGGER.log(Level.FINE, "CONNECT uri=" + message.getUri());
            }
        	ProxyAttachment proxyAttachment = ServerAttachment.proxys.get(message.getUri());
            if (proxyAttachment != null && proxyAttachment.getAsynchronousSocketChannel() != null) {
            	attachment.setProxyAttachment(proxyAttachment);
            	proxyAttachment.setServerAttachment(attachment);
            	if (proxyAttachment.getReadBuffer() == null) {
            	    if (LOGGER.isLoggable(Level.FINE)) {
            	        LOGGER.log(Level.FINE, "readBuffer IS NULL");
            	    }
            	    // 16KB
                	proxyAttachment.setReadBuffer(ByteBuffer.allocate(16 * 1024));
                	proxyAttachment.getAsynchronousSocketChannel().read(proxyAttachment.getReadBuffer(), proxyAttachment, new ProxyReadCompletionHandler());
            	} else {
                    if (LOGGER.isLoggable(Level.FINE)) {
                        LOGGER.log(Level.FINE, "readBuffer position=" + proxyAttachment.getReadBuffer().position());
                    }
            		new ProxyReadCompletionHandler().completed(proxyAttachment.getReadBuffer().position(), proxyAttachment);
            	}
            }
            readByteBuffer = ByteBuffer.allocate(13);
            attachment.getServer().read(readByteBuffer, attachment.setReadBuffer(readByteBuffer), this);
        } else if (type == Message.Type.DISCONNECT) {
        	if (attachment.isMastered()) {
                readByteBuffer.clear();
                attachment.getServer().read(readByteBuffer, attachment.setReadBuffer(readByteBuffer), this);
                
                ProxyAttachment proxyAttachment = ServerAttachment.proxys.remove(message.getUri());
                if (proxyAttachment != null) {
                	if (proxyAttachment.canWrited()) {
                		proxyAttachment.close();
                	}
                }
        	} else {
        		attachment.close();
        	}
        } else if (type == Message.Type.PROXYRESET) {
            if ("Y".equalsIgnoreCase(ServerCfg.getServerMappingsHttpOpened())) {
                String serverTokenAndclientToken = new String(message.getData());
                if (serverTokenAndclientToken != null && !serverTokenAndclientToken.isEmpty()) {
                    String[] strs = serverTokenAndclientToken.split("#");
                    if (strs.length == 2 && strs[0].equalsIgnoreCase(ServerCfg.getServerMappingsHttpToken())) {
                        String clientKey = strs[1];
                        ServerAttachment serverAttachment = Container.CLIENTS.get(clientKey);
                        resetInitProxy(clientKey, serverAttachment);
                    }
                }
            }
            close(attachment);
        } else {
            readByteBuffer = ByteBuffer.allocate(13);
            attachment.getServer().read(readByteBuffer, attachment.setReadBuffer(readByteBuffer), this);
        }
    }
    
    private synchronized static void resetInitProxy(String clientKey, ServerAttachment attachment) {
        if (attachment != null && attachment.isMastered()) {
            ServerAttachment old = Container.CLIENTS.get(clientKey);
            if (attachment != old)
                return;
            String serverMappings = ServerCfg.getServerMappings(clientKey);
            if (serverMappings == null || serverMappings.trim().isEmpty()) {
                Container.CLIENTS.remove(clientKey);
                attachment.close();
                return;
            }
            attachment.clear();
            String[] mappings = serverMappings.split(",");
            for (String mapping : mappings) {
                String[] port2remote = mapping.split("/");
                String local = port2remote[0];
                String remote = port2remote[1];
                if (local.indexOf(":") != -1) {
                    try {
                        AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(ThreadManager.singleton());
                        AsynchronousServerSocketChannel proxy = AsynchronousServerSocketChannel.open(group);
                        proxy.setOption(StandardSocketOptions.SO_REUSEADDR, true);
                        proxy.setOption(StandardSocketOptions.SO_RCVBUF, 8 * 1024 * 1024);
                        
                        String[] locals = local.split(":");
                        
                        proxy.bind(new InetSocketAddress(locals[0], Integer.valueOf(locals[1])));
                        proxy.accept(new ProxyEstablishmentAttachment(proxy).setServerAttachment(attachment).setRemote(remote), new ProxyEstablishmentCompletionHandler());
                        if (LOGGER.isLoggable(Level.INFO)) {
                            LOGGER.log(Level.INFO, MessageFormat.format("tcp port={0} -> remote={1}", local, remote));
                        }
                        attachment.addProxyServer(proxy);
                    } catch (Exception e) {
                        if (LOGGER.isLoggable(Level.SEVERE)) {
                            LOGGER.log(Level.SEVERE, MessageFormat.format("tcp port={0} -> remote={1}", local, remote));
                        }
                        e.printStackTrace();
                    }
                } else {
                    // port2remote=testng.jiucheng.org/127.0.0.1:8080
                    if (local != null && !local.isEmpty()) {
                        if (!Container.DOMAINS.contains(local)) {
                            attachment.addDomain(local);
                            List<Object> item = new ArrayList<Object>();
                            item.add(attachment);
                            item.add(remote);
                            Container.DOMAINS.put(local, item);
                            if (LOGGER.isLoggable(Level.INFO)) {
                                LOGGER.log(Level.INFO, MessageFormat.format("http11 domain={0} -> remote={1}", local, remote));
                            }
                        }
                    }
                }
            }
        }
    }
    
    private synchronized static boolean initProxy(String clientKey, ServerAttachment attachment) {
    	attachment.setMastered(true);
        ServerAttachment clientAttachment = Container.CLIENTS.remove(clientKey);
        if (clientAttachment != null) {
            clientAttachment.close();
        }
        
        String serverMappings = ServerCfg.getServerMappings(clientKey);
        if (serverMappings == null || serverMappings.trim().isEmpty()) {
            return false;
        }
        
        Container.CLIENTS.put(clientKey, attachment);
        
        String[] mappings = serverMappings.split(",");
        for (String mapping : mappings) {
            String[] port2remote = mapping.split("/");
            String local = port2remote[0];
            String remote = port2remote[1];
            if (local.indexOf(":") != -1) {
                try {
                    AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(ThreadManager.singleton());
                    AsynchronousServerSocketChannel proxy = AsynchronousServerSocketChannel.open(group);
                    proxy.setOption(StandardSocketOptions.SO_REUSEADDR, true);
                    proxy.setOption(StandardSocketOptions.SO_RCVBUF, 8 * 1024 * 1024);
                    
                    String[] locals = local.split(":");
                    
                    proxy.bind(new InetSocketAddress(locals[0], Integer.valueOf(locals[1])));
                    proxy.accept(new ProxyEstablishmentAttachment(proxy).setServerAttachment(attachment).setRemote(remote), new ProxyEstablishmentCompletionHandler());
                    if (LOGGER.isLoggable(Level.INFO)) {
                        LOGGER.log(Level.INFO, MessageFormat.format("tcp port={0} -> remote={1}", local, remote));
                    }
                    attachment.addProxyServer(proxy);
                } catch (Exception e) {
                    if (LOGGER.isLoggable(Level.SEVERE)) {
                        LOGGER.log(Level.SEVERE, MessageFormat.format("tcp port={0} -> remote={1}", local, remote));
                    }
                    e.printStackTrace();
                }
            } else {
            	// port2remote=testng.jiucheng.org/127.0.0.1:8080
            	if (local != null && !local.isEmpty()) {
            		if (!Container.DOMAINS.contains(local)) {
            			attachment.addDomain(local);
            			List<Object> item = new ArrayList<Object>();
            			item.add(attachment);
            			item.add(remote);
            			Container.DOMAINS.put(local, item);
	                    if (LOGGER.isLoggable(Level.INFO)) {
	                        LOGGER.log(Level.INFO, MessageFormat.format("http11 domain={0} -> remote={1}", local, remote));
	                    }
            		}
            	}
            }
        }
        return true;
    }

    public void failed(Throwable exc, ServerAttachment attachment) {
        close(attachment);
    }
    
    private void close(ServerAttachment attachment) {
        attachment.close();
    }
}
